[Feature] support v1 update/clear api for RL#6761
[Feature] support v1 update/clear api for RL#6761liyonghua0910 wants to merge 6 commits intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #6761 +/- ##
==========================================
Coverage ? 71.17%
==========================================
Files ? 395
Lines ? 54984
Branches ? 8678
==========================================
Hits ? 39137
Misses ? 13060
Partials ? 2787
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
cd16fe2 to
350a315
Compare
| self.proposer.clear_mtp_cache() | ||
| self.clear_cache() | ||
| paddle.device.cuda.empty_cache() | ||
| self.is_paused = True |
| if app.state.dynamic_load_weight: | ||
| status_code, msg = app.state.engine_client.clear_load_weight() | ||
| return JSONResponse(content=msg, status_code=status_code) | ||
| if envs.FD_ENABLE_V1_UPDATE_WEIGHTS: |
| if app.state.dynamic_load_weight: | ||
| status_code, msg = app.state.engine_client.update_model_weight() | ||
| return JSONResponse(content=msg, status_code=status_code) | ||
| if envs.FD_ENABLE_V1_UPDATE_WEIGHTS: |
|
|
||
| self._post_init() | ||
|
|
||
| def _post_init(self): |
There was a problem hiding this comment.
这个感觉放到接口自己维护比较好,request逻辑尽量保持通用
| while self.running: | ||
| try: | ||
| with self._pause_cond: | ||
| self._pause_cond.wait_for(lambda: not self.is_paused) |
There was a problem hiding this comment.
output为什么需要感知pause呢?如果没有新请求,output就不会有新token要处理;对于正在处理的请求,应该要等到preempted调度后自行结束,否则可能会有中间token阻塞在输出队列里
| engine_cache_queue_port = self.cfg.cache_config.local_cache_queue_port | ||
| name = f"ctrl_c2e_rank{tp_rank+tp_size*dp_index}_{engine_cache_queue_port}" | ||
| self.llm_logger.info(f"Init Cache Control Output Queue: {name} (consumer)") | ||
| self._ctrl_output_queues[name] = FMQ().queue(name, "consumer") |
There was a problem hiding this comment.
跟worker、cache transfer的控制通信感觉最好区分开,因为不一定所有的控制信号都会发给他俩
| result = asyncio.run(self._wait_for_control_responses(control_request.request_id, 60, executors=executors)) | ||
|
|
||
| # Resume the engine after wakeup | ||
| self._control_resume(None) |
There was a problem hiding this comment.
sleep和wake_up 内需要包含 pause和resume吗?是不是交由上游中控来调用,这样sleep和wake_up的语义更加明确
There was a problem hiding this comment.
这里一方面是想做个兜底,另一方面是为了兼容老接口语义,调一次 clear_load_weight 和调一次 sleep 可以实现相同的效果。最好确实需要拆分一下,可以用参数控制 sleep 时是否需要隐式包含 pause
Motivation
This PR upgrades the weight clearing and updating flow for RL scenarios.
The legacy control path mainly relied on shared memory to synchronize state across the engine, worker, and cache-related components. While functional, the signal path was not explicit enough, and it was difficult to trace how failed requests were handled across components. In addition, the old workflow usually cleared weights through
clear_load_weightfirst, even though residual requests could still exist, and then relied on a manualreset_schedulercall to clean up the scheduler queue. This made the lifecycle less explicit and introduced risks of inconsistent states during asynchronous resource recycling.The goal of this PR is to move the control flow to an explicit control-request path and replace the legacy weight clear/reload flow with the new
sleep/wakeupworkflow, so state transitions and troubleshooting become more straightforward.Modifications
ControlRequest/ControlResponsepath, so each control request has its own request ID and can be traced through logs end to end./v1/sleepand/v1/wakeup, withtagssupport to specify which part of GPU memory should be offloaded or reloaded. Enable these APIs byexport FD_ENABLE_V1_UPDATE_WEIGHTS=1./clear_load_weightand/update_model_weightfor compatibility:FD_ENABLE_V1_UPDATE_WEIGHTS=0,/clear_load_weightand/update_model_weightstill rely on shared memory for control and multi-process synchronization.FD_ENABLE_V1_UPDATE_WEIGHTS=1,/clear_load_weightand/update_model_weightswitch to the new control path, using the engine worker queue, engine cache queue, and FMQ for request dispatch and response collection./v1/pauseand/v1/resumewith cache-transfer-manager coordination to support multi-level cache and KV-cache-backend scenarios.Usage or Command
Export the following environment variable when starting server:
export FD_ENABLE_V1_UPDATE_WEIGHTS=1Send control requests:
Accuracy Tests
Checklist
pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.